// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "multi_cast_data_stream_source.h"

#include "common/status.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "pipeline/exec/operator.h"
#include "vec/core/block.h"
#include "vec/core/materialize_block.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state,
                                                                         OperatorXBase* parent)
        : Base(state, parent), _helper(parent->runtime_filter_descs()) {}

Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
    RETURN_IF_ERROR(Base::init(state, info));
    SCOPED_TIMER(exec_time_counter());
    SCOPED_TIMER(_init_timer);
    auto& p = _parent->cast<Parent>();
    // Pass operator profile to multi cast data streamer
    // so that it could get common_profile and custom_profile as same time.
    _shared_state->multi_cast_data_streamer->set_source_profile(p._consumer_id, operator_profile());
    _shared_state->multi_cast_data_streamer->set_dep_by_sender_idx(p._consumer_id, _dependency);
    _wait_for_rf_timer = ADD_TIMER(common_profile(), "WaitForRuntimeFilter");
    _filter_timer = ADD_TIMER(custom_profile(), "FilterTime");
    _get_data_timer = ADD_TIMER(custom_profile(), "GetDataTime");
    _materialize_data_timer = ADD_TIMER(custom_profile(), "MaterializeDataTime");

    // TODO: Not sure if runtime profile info shuold be added to common_profile or custom_profile
    // init profile for runtime filter
    RETURN_IF_ERROR(_helper.init(state, false, p.dest_id_from_sink(), p.operator_id(),
                                 _filter_dependencies, p.get_name() + "_FILTER_DEPENDENCY"));
    return Status::OK();
}

Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) {
    SCOPED_TIMER(exec_time_counter());
    SCOPED_TIMER(_open_timer);
    RETURN_IF_ERROR(Base::open(state));
    auto& p = _parent->cast<Parent>();
    RETURN_IF_ERROR(
            _helper.acquire_runtime_filter(state, _conjuncts, p._multi_cast_output_row_descriptor));
    _output_expr_contexts.resize(p._output_expr_contexts.size());
    for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
        RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, _output_expr_contexts[i]));
    }
    return Status::OK();
}

Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
    if (_closed) {
        return Status::OK();
    }

    SCOPED_TIMER(_close_timer);
    SCOPED_TIMER(exec_time_counter());
    int64_t rf_time = 0;
    for (auto& dep : _filter_dependencies) {
        rf_time += dep->watcher_elapse_time();
    }
    COUNTER_SET(_wait_for_rf_timer, rf_time);
    _helper.collect_realtime_profile(custom_profile());
    return Base::close(state);
}

Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                                                       vectorized::Block* block, bool* eos) {
    //auto& local_state = get_local_state(state);
    auto& local_state = get_local_state(state);
    SCOPED_TIMER(local_state.exec_time_counter());
    vectorized::Block tmp_block;
    vectorized::Block* output_block = block;
    if (!local_state._output_expr_contexts.empty()) {
        output_block = &tmp_block;
    }
    {
        SCOPED_TIMER(local_state._get_data_timer);
        RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer->pull(
                state, _consumer_id, output_block, eos));
    }

    int arrived_rf_num = 0;
    RETURN_IF_ERROR(local_state._helper.try_append_late_arrival_runtime_filter(
            state, &arrived_rf_num, local_state._conjuncts, _multi_cast_output_row_descriptor));

    if (!local_state._conjuncts.empty() && !output_block->empty()) {
        SCOPED_TIMER(local_state._filter_timer);
        RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, output_block,
                                                               output_block->columns()));
    }

    if (!local_state._output_expr_contexts.empty() && output_block->rows() > 0) {
        SCOPED_TIMER(local_state._materialize_data_timer);
        RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
                local_state._output_expr_contexts, *output_block, block, true));
        vectorized::materialize_block_inplace(*block);
    }
    return Status::OK();
}

} // namespace doris::pipeline
